Skip to content

[FLINK-37443] Add returns() method to DataStream V2 API for specifying output types with lambda expressions#26284

Open
nilmadhab wants to merge 10 commits intoapache:masterfrom
nilmadhab:FLINK-37443/add-returns-method-datastreamv2
Open

[FLINK-37443] Add returns() method to DataStream V2 API for specifying output types with lambda expressions#26284
nilmadhab wants to merge 10 commits intoapache:masterfrom
nilmadhab:FLINK-37443/add-returns-method-datastreamv2

Conversation

@nilmadhab
Copy link
Contributor

What is the purpose of the change

(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@nilmadhab nilmadhab changed the title Trying out using lambda function in dataStreamV2 tests FLINK-37443 Trying out using lambda function in dataStreamV2 tests Mar 11, 2025
@flinkbot
Copy link
Collaborator

flinkbot commented Mar 11, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@nilmadhab nilmadhab marked this pull request as ready for review March 13, 2025 20:34
@nilmadhab nilmadhab changed the title FLINK-37443 Trying out using lambda function in dataStreamV2 tests [FLINK-37443] Add returns() method to DataStream V2 API for specifying output types with lambda expressions Mar 13, 2025
@nilmadhab
Copy link
Contributor Author

@flinkbot run azure

@nilmadhab
Copy link
Contributor Author

@reswqa can you please review it?

@reswqa
Copy link
Member

reswqa commented Mar 19, 2025

cc @codenohup

</dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API modules should not depend on non-API modules, because this would require users to depend on the flink-core module when developing DataStream V2 programs, but flink-core should only be used at application runnning.

You might consider moving TypeInformation to the API module, such as core-api. However, I understand that this is challenging due to its dependencies on numerous other classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can copy it, but will it be okay to have these duplicate code in codebase? Or is there any other better way to do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency of flink-core module through usingTypeInformation is only used in the interfaces located in flink-datastream-api, as the implementations are defined in flink-datastream module which are already using flink-core module.

I think it does not make much sense to copy a large number of files and maintain it just for a small feature and that too only for defining interfaces. I am open to other ideas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency of flink-core module through usingTypeInformation is only used in the interfaces located in flink-datastream-api, as the implementations are defined in flink-datastream module which are already using flink-core module.

I think it does not make much sense to copy a large number of files and maintain it just for a small feature and that too only for defining interfaces. I am open to other ideas.

Sorry for the delayed reply. I think we can introduce a class, such as TypeHint, at the API level that allows users to define a class with generics. Subsequently, this TypeHint can be converted into TypeInformation at the implementation level.
During this process, you can leverage TypeDescriptor to describe common types within the flink-datastream-api. Additionally, it would be advantageous to incorporate the tuple type into TypeDescriptor.

* @param typeInfo type information as a return type hint
* @return This operator with a given return type hint.
*/
NonKeyedPartitionStream<T> returns(TypeInformation<T> typeInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyedPartitionStream and GlobalStream also need returns method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay I will add it,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay I will add it,

BroadcastStream also need returns method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be just part of datastream interface? Then it will extend all other streams by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be just part of datastream interface? Then it will extend all other streams by default.

Thank you for the reminder. You can add the withReturnType method to the ProcessConfigurable interface, as it proves to be useful in most scenarios. Regarding situations with two output streams, users can obtain both streams individually and then apply the withReturnType method to each. Here's an example to illustrate:

NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<Integer, String>
                twoOutputStream = ...;
twoOutputStream.getFirst().withReturnType(TypeDescriptor.INT).process(...);
twoOutputStream.getSecond().withReturnType(TypeDescriptor.String).process(...);

* DataStream API. It is currently in the experimental stage and is not fully available for
* production.
*/
public class WordCountUsingLambda {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you should rewrite the previous example to use lambda expressions instead of rewriting an almost identical example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was thinking the same !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed !

Copy link
Contributor

@codenohup codenohup left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @nilmadhab
Thank you for your contribution! I've left some comments, PTAL.

@nilmadhab
Copy link
Contributor Author

Any other examples I can modify or test case to add? @codenohup

@nilmadhab
Copy link
Contributor Author

@flinkbot run azure

@codenohup
Copy link
Contributor

Any other examples I can modify or test case to add? @codenohup

It is best to modify the original example/test case. If it does not meet your needs, you can also add a new example/test case.

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed component=API/DataStream community-reviewed PR has been reviewed by the community. labels Jun 30, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 1, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 17, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 24, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 15, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants